Skip to content

feat: Event-Driven DA Follower with WebSocket Subscriptions#3131

Open
alpe wants to merge 14 commits intomainfrom
alex/2803_best_2worlds
Open

feat: Event-Driven DA Follower with WebSocket Subscriptions#3131
alpe wants to merge 14 commits intomainfrom
alex/2803_best_2worlds

Conversation

@alpe
Copy link
Contributor

@alpe alpe commented Mar 3, 2026

Summary

Replaces the Syncer's polling-based DA worker with an event-driven DAFollower that subscribes to DA header and data events in real time. This eliminates unnecessary polling latency in follow mode and brings zero-latency block processing when the node is caught up with the DA layer.

Changes

Core: Event-Driven DAFollower (block/internal/syncing/)

  • Introduces DAFollower, a new component that subscribes to both the header and data namespaces of the DA layer and processes them inline, achieving zero-latency follow mode.
  • Adds an inline blob processing path so blobs arriving with their header are handled immediately without a separate retrieval round-trip.
  • Adds a subscription watchdog that detects stalled DA subscriptions and triggers recovery.
  • Syncer refactored to delegate event-driven follow logic to DAFollower; the old polling worker is removed.
  • Extends the DARetriever interface and its tracing/mock implementations to support the new subscription flow.

DA Client: WebSocket-Based JSON-RPC (pkg/da/, block/internal/da/)

  • Updates the JSON-RPC DA client to use WebSockets for subscriptions, replacing the previous HTTP-polling approach.
  • Adds a proper WebSocket constructor to the client so callers can opt into streaming.
  • Extends DA types with subscription-relevant fields.
  • Security hardening on the DA subscription path.

Local DA Tooling (tools/local-da/)

  • Adds blob subscription support to the local dummy DA, enabling full E2E testing of the event-driven path without a live Celestia node.
  • Extends local.go with subscription broadcasting and rpc.go with the corresponding RPC endpoints.

Test Infrastructure

  • New da_retriever_mock.go and updates to test/mocks/da.go and test/testda/dummy.go to cover subscription interfaces.
  • syncer_backoff_test.go and syncer_test.go updated and significantly extended to cover the new event-driven flows.
  • E2E tests updated for new evnode flags and P2P address retrieval patterns.

Summary by CodeRabbit

  • New Features

    • WebSocket-based blob client and subscription constructor for real-time DA streams
    • Blob subscription API and SubscriptionEvent type for streaming DA notifications
    • DA follower for continuous catch-up, inline blob processing, and backoff handling
    • Local/test DA and dummy/test helpers now support publish/subscribe notifications
    • Public ProcessBlobs API for processing raw blobs into height events
  • Bug Fixes

    • Corrected full node startup configuration flag names and startup ordering
  • Tests

    • Expanded tests for catch-up, backoff, inline processing, and subscription flows

alpe added 3 commits March 3, 2026 13:02
…ollower and introduce DA client subscription.
…w mode

When the DA subscription delivers blobs at the current local DA height,
the followLoop now processes them inline via ProcessBlobs — avoiding
a round-trip re-fetch from the DA layer.

Architecture:
- followLoop: processes subscription blobs inline when caught up (fast path),
  falls through to catchupLoop when behind (slow path).
- catchupLoop: unchanged — sequential RetrieveFromDA() for bulk sync.

Changes:
- Add Blobs field to SubscriptionEvent for carrying raw blob data
- Add extractBlobData() to DA client Subscribe adapter
- Export ProcessBlobs on DARetriever interface
- Add handleSubscriptionEvent() to DAFollower with inline fast path
- Add TestDAFollower_InlineProcessing with 3 sub-tests
When header and data use different DA namespaces, the DAFollower now
subscribes to both and merges events via a fan-in goroutine. This ensures
inline blob processing works correctly for split-namespace configurations.

Changes:
- Add DataNamespace to DAFollowerConfig and daFollower
- Subscribe to both namespaces in runSubscription with mergeSubscriptions fan-in
- Guard handleSubscriptionEvent to only advance localDAHeight when
  ProcessBlobs returns at least one complete event (header+data matched)
- Pass DataNamespace from syncer.go
- Implement Subscribe on DummyDA test helper with subscriber notification
@github-actions
Copy link
Contributor

github-actions bot commented Mar 3, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedMar 6, 2026, 12:38 PM

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 3, 2026

📝 Walkthrough

Walkthrough

Adds WebSocket DA subscriptions and a subscriber-based publish/subscribe path; introduces DAFollower to subscribe and drive inline processing and sequential catch‑up; propagates Subscribe through DA client interfaces, mocks, and local/test DA implementations; replaces several blob client constructors with NewWSClient.

Changes

Cohort / File(s) Summary
Blob client constructor updates
apps/evm/cmd/run.go, apps/grpc/cmd/run.go, apps/testapp/cmd/run.go, pkg/cmd/run_node.go
Replaced blobrpc.NewClient(...) with blobrpc.NewWSClient(...) (WS URL conversion) and adjusted context/Close usage where applicable.
DA client interface & JSON-RPC WS client
block/internal/da/interface.go, block/internal/da/client.go, block/internal/da/tracing.go, pkg/da/jsonrpc/client.go
Added Subscribe(ctx, namespace) to DA client interface and implementations; added NewWSClient(ctx, addr, token, authHeaderName) and http→ws conversion helper.
Subscription event type
pkg/da/types/types.go
Added SubscriptionEvent{Height uint64; Blobs [][]byte} for Subscribe producers/consumers.
DAFollower implementation
block/internal/syncing/da_follower.go
New DAFollower interface, config, constructor and concrete implementation with followLoop (subscriptions), catchupLoop, backoff, inline processing and lifecycle methods.
DARetriever API & internals
block/internal/syncing/da_retriever.go, block/internal/syncing/da_retriever_tracing.go, block/internal/syncing/da_retriever_mock.go, block/internal/syncing/da_retriever_tracing_test.go
Added ProcessBlobs(ctx, blobs, daHeight) []common.DAHeightEvent, refactored internal processing (mutex/atomic, dedupe), and added tracing/mocks support.
Syncer integration & tests/benchmarks
block/internal/syncing/syncer.go, block/internal/syncing/syncer_backoff_test.go, block/internal/syncing/syncer_benchmark_test.go, block/internal/syncing/syncer_test.go
Replaced internal daWorkerLoop with DAFollower field; Start/Stop now create/stop follower; tests/benchmarks updated to use DAFollower flows and assertions.
Mocks & test DA implementations
test/mocks/da.go, apps/evm/server/force_inclusion_test.go, block/internal/da/tracing_test.go, test/testda/dummy.go, tools/local-da/local.go, tools/local-da/rpc.go
Added Subscribe to mocks and test DAs; implemented subscriber registries, notify semantics, and streaming Subscribe behavior; updated mock scaffolding for ProcessBlobs.
Local DA RPC changes
tools/local-da/rpc.go
Converted Subscribe from closed-channel no-op to streaming subscription supporting context cancellation and namespace filtering; notify subscribers on Submit and empty submissions.
E2E and test CLI updates
test/e2e/evm_force_inclusion_e2e_test.go, test/e2e/evm_test_common.go
Full node connect target changed to sequencer full P2P address; test flags switched from rollkit.* to evnode.* equivalents.
Tests: small updates
apps/evm/server/force_inclusion_test.go, block/internal/syncing/da_retriever_strict_test.go
Added mock Subscribe method for test DA; updated strictMode assertions to use atomic.Load where applicable.

Sequence Diagram

sequenceDiagram
    participant Syncer
    participant DAFollower
    participant DAClient as "DA Client (WS)"
    participant Retriever
    participant Pipeline as "Event Pipeline"

    Syncer->>DAFollower: Start(ctx)
    activate DAFollower

    DAFollower->>DAClient: Subscribe(ctx, namespace)
    DAClient-->>DAFollower: subscription channel (SubscriptionEvent)

    loop on subscription event
        DAFollower->>DAFollower: updateHighest(height)
        alt inline (localNext == height)
            DAFollower->>Retriever: ProcessBlobs(ctx, blobs, height)
            Retriever-->>DAFollower: []DAHeightEvent
            DAFollower->>Pipeline: pipe events
        else backlog
            DAFollower->>DAFollower: signal catchup
        end
    end

    loop catch-up sequence
        DAFollower->>Retriever: RetrieveFromDA(ctx, height)
        Retriever-->>DAFollower: blobs / Err
        DAFollower->>Retriever: ProcessBlobs(ctx, blobs, height)
        Retriever-->>DAFollower: []DAHeightEvent
        DAFollower->>Pipeline: pipe events
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

T:testing

Suggested reviewers

  • tuxcanfly
  • tac0turtle

Poem

🐇 I tuned my ears to WebSocket streams,
Blobs hopped in lines beneath moonbeams.
I follow heights and pipe each run,
Inline or catch‑up, hop, fetch, and done. 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 79.17% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: Event-Driven DA Follower with WebSocket Subscriptions' accurately summarizes the main change: introducing an event-driven DAFollower component with WebSocket support to replace polling-based DA workers.
Description check ✅ Passed The PR description provides comprehensive coverage of all major changes including the event-driven DAFollower, WebSocket support, local DA tooling updates, and test infrastructure changes, with clear organization and context.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch alex/2803_best_2worlds

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Contributor

claude bot commented Mar 3, 2026

Claude finished @alpe's task in 2m 52s —— View job


Code Review: Event-Driven DA Follower with WebSocket Subscriptions

  • Gather context and understand PR scope
  • Review core DA follower implementation (block/internal/syncing/da_follower.go)
  • Review DA retriever changes (block/internal/syncing/da_retriever.go)
  • Review syncer integration (block/internal/syncing/syncer.go)
  • Review DA client/interface changes (block/internal/da/)
  • Review local DA tooling (tools/local-da/)
  • Review test infrastructure
  • Provide final comprehensive feedback

This is a well-structured and impactful PR that replaces polling with an event-driven subscription model. The two-goroutine design (followLoop + catchupLoop) with atomic coordination is sound. Below are my findings, grouped by severity.


Critical / Correctness

1. Atomic rollback can regress localNextDAHeight — three sites

da_follower.go:266, da_follower.go:280, da_follower.go:365

All three rollback paths use an unconditional Store:

f.localNextDAHeight.Store(ev.Height)   // line 266: pipeEvent failure in followLoop
f.localNextDAHeight.Store(ev.Height)   // line 280: "no complete events" branch
f.localNextDAHeight.Store(local)       // line 365: fetchAndPipeHeight failure in catchupLoop

The problem: between the moment the goroutine CAS'd local → local+1 and the moment it decides to roll back, the other goroutine may have further advanced the counter (e.g. inline processing advanced it to local+2). A plain Store(local) then moves it backward, causing the height to be re-fetched.

The fix is to use CAS for the rollback, so it only applies if nothing else has moved the counter:

// e.g. line 266 — only roll back if we still "own" the next slot
f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)

Fix this →

2. testda/dummy.go — potential double-close is now fixed, but height increment ordering

The current Subscribe goroutine only closes ch inside the if s == sub block — so the double-close risk from Reset() is avoided. ✅

However, in StartHeightTicker (line 300), d.height.Add(1) fires before d.mu.Lock(), so GetLatestDAHeight() can observe the new height before d.headers[height] is set or subscribers are notified. This creates a brief inconsistency window (not a hard bug, but a subtle test hazard):

// Current (inconsistent window):
height := d.height.Add(1)  // visible immediately
d.mu.Lock()
if d.headers[height] == nil { ... }
d.notifySubscribers(...)
d.mu.Unlock()

Moving d.height.Add(1) inside the lock would eliminate this.


Major / Design

3. mergeSubscriptions buffer may drop events under backpressure

da_follower.go:217

out := make(chan datypes.SubscriptionEvent, 16)

If handleSubscriptionEvent takes too long (e.g. ProcessBlobs is slow), the merge goroutine will block on the out <- ev send because the handler holds the goroutine that drains it. The watchdog timer then triggers and reconnects, causing duplicate or lost events. Consider increasing the buffer or documenting the accepted backpressure contract.

4. runCatchup priority-height draining is a busy-spin under load

da_follower.go:330–333

for priorityHeight > 0 && priorityHeight < f.localNextDAHeight.Load() {
    priorityHeight = f.retriever.PopPriorityHeight()
}

If many stale hints have been queued (e.g. a burst of P2P hints during a long catchup), this loop spins on the mutex. This is fine for typical workloads but worth noting for production deployments with high P2P hint rates.

5. tools/local-daSubmitWithOptions and Submit silently swallow blob conversion errors

local.go:234–236, local.go:275–277

if b, err := blobrpc.NewBlobV0(nspace, blob); err == nil {
    rpcBlobs[i] = b
}
// rpcBlobs[i] is nil if NewBlobV0 fails

Subscribers then receive events with nil blob entries. While this is test-only tooling, it can cause silent test failures in E2E scenarios. Even a panic(err.Error()) or log.Fatal would be clearer for test code.


Minor / Style

6. da_follower.gowatchdogMultiplier const placed oddly

da_follower.go:435

const watchdogMultiplier = 3

func (f *daFollower) watchdogTimeout() time.Duration {

The const is interleaved with the method it only applies to. Consider moving it into the method body or the top-level const block.

7. da_retriever.goProcessBlobs threading comment

da_retriever.go:201–202

The comment says "NOT thread-safe: the caller (DAFollower) must ensure exclusive access via CAS". This is technically accurate, but slightly misleading — the pendingMu mutex is taken inside processBlobs, making the map operations thread-safe internally. The comment means "don't call ProcessBlobs for the same height from two goroutines concurrently" — this distinction could be clearer.

8. da_follower.goNewDAFollower should validate required fields

da_follower.go:89

cfg.Client, cfg.Retriever, and cfg.PipeEvent are all nil-deref panics if omitted. A quick upfront check with a meaningful panic message would make misconfiguration easier to diagnose than a nil pointer dereference deep in a goroutine.

9. da_follower.gosignalCatchup() is called before the catchupLoop goroutine starts

da_follower.go:112–115

f.wg.Add(2)
go f.followLoop(ctx)
f.signalCatchup()   // fires before catchupLoop is scheduled
go f.catchupLoop(ctx)

This is safe because catchupSignal is buffered (cap 1), so the signal is not lost. Worth a brief comment explaining the buffered-channel bootstrap intentionally, as it's non-obvious.


Positive Observations

  • Architecture is clean: The two-goroutine model (followLoop / catchupLoop) with only atomic shared state and a single buffered signal channel is elegant and easy to reason about.
  • CAS-based inline processing claim in handleSubscriptionEvent correctly prevents races between the two loops for the same DA height.
  • Watchdog timer adds good resilience against stalled subscriptions.
  • signalCatchup() at startup bootstraps catchup without waiting for the first subscription event — a subtle but important detail.
  • extractBlobData in client.go correctly guards against oversized/nil blobs from the subscription stream.
  • Deferred s.Stop() in Syncer.Start() correctly cleans up all goroutines on partial startup failure.
  • Test use of testing/synctest is a great fit for these time-sensitive scenarios and avoids flaky sleeps.

Summary

The most important fix before merge is item #1 (the atomic rollback races). Items 2–5 are worth addressing but less likely to be triggered in practice. The rest are polish.

If no subscription events arrive within 3× the DA block time (default
30s), the watchdog triggers and returns an error. The followLoop then
reconnects the subscription with the standard backoff. This prevents
the node from silently stopping sync when the DA subscription stalls
(e.g., network partition, DA node freeze).
@codecov
Copy link

codecov bot commented Mar 3, 2026

Codecov Report

❌ Patch coverage is 30.50847% with 205 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.25%. Comparing base (c0bc141) to head (191ae01).
⚠️ Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
block/internal/syncing/da_follower.go 38.97% 112 Missing and 7 partials ⚠️
block/internal/da/client.go 0.00% 36 Missing ⚠️
block/internal/syncing/syncer.go 3.70% 25 Missing and 1 partial ⚠️
block/internal/syncing/da_retriever.go 59.09% 6 Missing and 3 partials ⚠️
pkg/da/jsonrpc/client.go 0.00% 6 Missing ⚠️
pkg/cmd/run_node.go 0.00% 5 Missing ⚠️
block/internal/da/tracing.go 0.00% 2 Missing ⚠️
block/internal/syncing/da_retriever_tracing.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3131      +/-   ##
==========================================
- Coverage   60.87%   60.25%   -0.62%     
==========================================
  Files         113      114       +1     
  Lines       11695    11888     +193     
==========================================
+ Hits         7119     7163      +44     
- Misses       3773     3918     +145     
- Partials      803      807       +4     
Flag Coverage Δ
combined 60.25% <30.50%> (-0.62%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

alpe added 2 commits March 3, 2026 15:24
…ient to use WebSockets, along with E2E test updates for new `evnode` flags and P2P address retrieval.
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Spamoor Trace Benchmarks'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.

Benchmark suite Current: dd7e0cd Previous: 2c75e9e Ratio
SpamoorSmoke - Batch.SetHeight (avg) 15.020408163265307 us 8.824561403508772 us 1.70
SpamoorSmoke - Store.GetBlockData (avg) 13.59433962264151 us 7.780701754385965 us 1.75

This comment was automatically generated by workflow using github-action-benchmark.

alpe added 4 commits March 4, 2026 14:13
* main:
  chore: add stricter linting (#3132)
  feat(benchmarking): adding ERC20 benchmarking test (#3114)
  feat: ensure p2p DAHint within limits (#3128)
@alpe alpe changed the title [WIP] adding sync mode feat: Event-Driven DA Follower with WebSocket Subscriptions Mar 4, 2026
@alpe alpe marked this pull request as ready for review March 4, 2026 17:23
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (4)
block/internal/da/interface.go (1)

20-23: Clarify the post-cancellation drain contract in the comment.

“Callers MUST drain the channel after cancellation” can be interpreted differently; consider explicitly stating the expected sequence (cancel context, keep receiving until channel closes).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/interface.go` around lines 20 - 23, Update the comment for
Subscribe to explicitly describe the post-cancellation drain sequence: state
that callers should cancel the provided ctx, then continue receiving from the
returned <-chan datypes.SubscriptionEvent (e.g., range over the channel) until
it is closed by the implementation; clarify that the channel will be closed once
cancellation is observed and that draining (receiving until close) prevents
goroutine/resource leaks. Reference the Subscribe(ctx context.Context, namespace
[]byte) signature and datypes.SubscriptionEvent in the comment.
block/internal/syncing/da_retriever_tracing.go (1)

67-69: Consider instrumenting ProcessBlobs in the tracing wrapper.

This new hot path currently emits no span/attributes, which limits visibility into inline blob processing latency and output volume.

Suggested refactor
 func (t *tracedDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
-	return t.inner.ProcessBlobs(ctx, blobs, daHeight)
+	ctx, span := t.tracer.Start(ctx, "DARetriever.ProcessBlobs",
+		trace.WithAttributes(
+			attribute.Int64("da.height", int64(daHeight)),
+			attribute.Int("blob.count", len(blobs)),
+		),
+	)
+	defer span.End()
+
+	events := t.inner.ProcessBlobs(ctx, blobs, daHeight)
+	span.SetAttributes(attribute.Int("event.count", len(events)))
+	return events
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_retriever_tracing.go` around lines 67 - 69, The
tracing wrapper tracedDARetriever.ProcessBlobs is not creating any span or
attributes, so add instrumentation around the call to t.inner.ProcessBlobs:
start a span (using the same tracer used elsewhere in this file), set attributes
for blob_count (len(blobs)), da_height (daHeight) and optionally
total_blob_bytes (sum of len for each blob), call t.inner.ProcessBlobs(ctx,
blobs, daHeight), record the resulting events count as an attribute (e.g.,
da_events_count) and any error/status if applicable, then end the span; keep the
wrapper behavior identical except for adding the span and attributes to improve
visibility into latency and output volume.
apps/evm/server/force_inclusion_test.go (1)

53-58: Prefer a mockery-generated DA mock over extending the hand-written stub.

This local Subscribe stub works, but it increases interface-drift maintenance as DA APIs evolve. Using the generated DA mock keeps contract changes centralized.

As per coding guidelines, "Mock external dependencies using mockery".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/evm/server/force_inclusion_test.go` around lines 53 - 58, Replace the
hand-written Subscribe implementation on mockDA with the mockery-generated DA
mock: remove or stop using the local mockDA.Subscribe stub and instead import
and instantiate the mockery-created mock (e.g., MockDA) and use its EXPECT/On
setup to return a closed (<-chan da.SubscriptionEvent) or desired channel for
the test; update test setup where mockDA is constructed to use the mockery mock
and set the Subscribe return behavior via the mock's EXPECT/On methods so the
external DA contract is maintained centrally.
block/internal/da/tracing.go (1)

148-150: Consider tracing subscription setup for consistency.

Subscribe currently bypasses the tracing pattern used by the rest of this wrapper, so subscribe failures won’t be visible in DA spans.

Suggested refactor
 func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
-	return t.inner.Subscribe(ctx, namespace)
+	ctx, span := t.tracer.Start(ctx, "DA.Subscribe",
+		trace.WithAttributes(
+			attribute.Int("ns.length", len(namespace)),
+			attribute.String("da.namespace", hex.EncodeToString(namespace)),
+		),
+	)
+	defer span.End()
+
+	ch, err := t.inner.Subscribe(ctx, namespace)
+	if err != nil {
+		span.RecordError(err)
+		span.SetStatus(codes.Error, err.Error())
+		return nil, err
+	}
+	return ch, nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/da/tracing.go` around lines 148 - 150, tracedClient.Subscribe
bypasses the wrapper's tracing; update the method to create a DA span (matching
the tracing pattern used elsewhere in this file), start the span with the
incoming ctx, add relevant attributes (e.g., namespace), call
t.inner.Subscribe(ctx, namespace), record any returned error on the span, and
end the span before returning the channel and error so subscription setup
failures are captured in DA spans; reference the tracedClient.Subscribe method
and t.inner.Subscribe call when making this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/evm/cmd/run.go`:
- Around line 63-66: The WS dial is using context.Background() when calling
blobrpc.NewWSClient which prevents CLI cancellation from propagating; replace
the background context with the command-scoped context (use cmd.Context() or a
derived context) when calling blobrpc.NewWSClient so that the DA WebSocket
handshake is canceled on CLI shutdown and respects timeouts; update the call
site where blobrpc.NewWSClient(context.Background(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is invoked to pass cmd.Context() (or ctx :=
cmd.Context() / ctx, cancel := context.WithTimeout(cmd.Context(), ...) if you
need a timeout) instead.

In `@block/internal/syncing/da_follower.go`:
- Around line 265-267: The unconditional f.localDAHeight.Store(ev.Height) can
regress localDAHeight if runCatchup advanced it concurrently; change the
rollback to a CAS so we only set it back when the current value equals the value
we expect to overwrite. Replace the Store(ev.Height) rollback with a
compare-and-swap using the atomic type/method (e.g.,
f.localDAHeight.CompareAndSwap(expectedPrev, ev.Height) or
atomic.CompareAndSwapUint64) where expectedPrev is the value read before
attempting the rollback; apply the same CAS pattern in both places noted around
the runCatchup logic to avoid moving localDAHeight backwards.
- Around line 271-273: The code sets f.headReached.Store(true) whenever inline
processing yields events, which can be incorrect if highestSeenDAHeight is still
ahead; modify the condition around the len(events) branch in da_follower.go (the
block using ev.Height and f.headReached) to only set f.headReached when the
event height indicates we are actually caught up (e.g., ev.Height >=
f.highestSeenDAHeight or when highestSeenDAHeight is unset/zero), otherwise do
not flip headReached; update the conditional that logs with f.logger.Debug() so
the headReached.Store(true) call is guarded by this explicit comparison to
highestSeenDAHeight.

In `@block/internal/syncing/syncer_benchmark_test.go`:
- Around line 46-61: The test starts goroutines with b.Context() which doesn't
get cancelled by the fixture teardown (fixt.s.cancel()), causing potential
goroutine leaks; change all uses of b.Context() when launching long-lived worker
goroutines to use the fixture's cancelable context (fixt.s.ctx) instead —
specifically pass fixt.s.ctx into fixt.s.processLoop, follower.runCatchup (and
the NewDAFollower call site that spawns it), and fixt.s.startSyncWorkers; apply
the same replacement for the other occurrences mentioned (lines near 64-69 and
78-79) so teardown via fixt.s.cancel() and fixt.s.wg.Wait() correctly shuts down
workers.

In `@block/internal/syncing/syncer.go`:
- Around line 198-213: processLoop is started with s.wg.Go before starting
s.daFollower, so if s.daFollower.Start(ctx) fails the goroutine can leak; wrap
the work with a cancellable child context (e.g., childCtx, cancel :=
context.WithCancel(ctx)) and pass childCtx to s.processLoop and to
daFollower.Start, then on Start failure call cancel() and wait for in-flight
goroutines to finish (s.wg.Wait or equivalent) before returning the error;
update usages of s.wg.Go(func() { s.processLoop(ctx) }) and
s.daFollower.Start(ctx) to use childCtx and ensure cancel + wait are performed
on the error path.

In `@test/testda/dummy.go`:
- Around line 66-77: The goroutine unconditionally closes ch on ctx.Done(),
causing a double-close if Reset() already closed it; modify the removal logic in
the ctx.Done() goroutine (which locks d.mu and iterates d.subscribers comparing
s to sub) to only close ch when this goroutine actually removed the subscriber
(e.g., set a removed bool when s==sub, perform d.subscribers = append(...),
break, and then close ch only if removed is true), so Reset() and this goroutine
cannot both close the same channel; reference d.mu, d.subscribers, sub, ch and
Reset() when making the change.

In `@tools/local-da/local.go`:
- Around line 230-231: The subscription events lose blob payloads because
notifySubscribers reads from d.blobData[height] while SubmitWithOptions and
other submit paths only write to d.data and never populate d.blobData; update
the submit code paths (e.g., SubmitWithOptions and the other submit handlers
around the noted regions) to also populate d.blobData[height] with the
corresponding blob list when they set d.data[height] (or move notifySubscribers
to read from d.data if that is the intended single source); ensure you update
the same symbol names (d.data, d.blobData) so notifySubscribers(d.height) will
see the blobs and subscribers receive non-empty blob lists.

---

Nitpick comments:
In `@apps/evm/server/force_inclusion_test.go`:
- Around line 53-58: Replace the hand-written Subscribe implementation on mockDA
with the mockery-generated DA mock: remove or stop using the local
mockDA.Subscribe stub and instead import and instantiate the mockery-created
mock (e.g., MockDA) and use its EXPECT/On setup to return a closed (<-chan
da.SubscriptionEvent) or desired channel for the test; update test setup where
mockDA is constructed to use the mockery mock and set the Subscribe return
behavior via the mock's EXPECT/On methods so the external DA contract is
maintained centrally.

In `@block/internal/da/interface.go`:
- Around line 20-23: Update the comment for Subscribe to explicitly describe the
post-cancellation drain sequence: state that callers should cancel the provided
ctx, then continue receiving from the returned <-chan datypes.SubscriptionEvent
(e.g., range over the channel) until it is closed by the implementation; clarify
that the channel will be closed once cancellation is observed and that draining
(receiving until close) prevents goroutine/resource leaks. Reference the
Subscribe(ctx context.Context, namespace []byte) signature and
datypes.SubscriptionEvent in the comment.

In `@block/internal/da/tracing.go`:
- Around line 148-150: tracedClient.Subscribe bypasses the wrapper's tracing;
update the method to create a DA span (matching the tracing pattern used
elsewhere in this file), start the span with the incoming ctx, add relevant
attributes (e.g., namespace), call t.inner.Subscribe(ctx, namespace), record any
returned error on the span, and end the span before returning the channel and
error so subscription setup failures are captured in DA spans; reference the
tracedClient.Subscribe method and t.inner.Subscribe call when making this
change.

In `@block/internal/syncing/da_retriever_tracing.go`:
- Around line 67-69: The tracing wrapper tracedDARetriever.ProcessBlobs is not
creating any span or attributes, so add instrumentation around the call to
t.inner.ProcessBlobs: start a span (using the same tracer used elsewhere in this
file), set attributes for blob_count (len(blobs)), da_height (daHeight) and
optionally total_blob_bytes (sum of len for each blob), call
t.inner.ProcessBlobs(ctx, blobs, daHeight), record the resulting events count as
an attribute (e.g., da_events_count) and any error/status if applicable, then
end the span; keep the wrapper behavior identical except for adding the span and
attributes to improve visibility into latency and output volume.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 24171a7e-5193-4ec4-9f32-030b0acc2d20

📥 Commits

Reviewing files that changed from the base of the PR and between 2c75e9e and dd7e0cd.

📒 Files selected for processing (26)
  • apps/evm/cmd/run.go
  • apps/evm/server/force_inclusion_test.go
  • apps/grpc/cmd/run.go
  • apps/testapp/cmd/run.go
  • block/internal/da/client.go
  • block/internal/da/interface.go
  • block/internal/da/tracing.go
  • block/internal/da/tracing_test.go
  • block/internal/syncing/da_follower.go
  • block/internal/syncing/da_retriever.go
  • block/internal/syncing/da_retriever_mock.go
  • block/internal/syncing/da_retriever_tracing.go
  • block/internal/syncing/da_retriever_tracing_test.go
  • block/internal/syncing/syncer.go
  • block/internal/syncing/syncer_backoff_test.go
  • block/internal/syncing/syncer_benchmark_test.go
  • block/internal/syncing/syncer_test.go
  • pkg/cmd/run_node.go
  • pkg/da/jsonrpc/client.go
  • pkg/da/types/types.go
  • test/e2e/evm_force_inclusion_e2e_test.go
  • test/e2e/evm_test_common.go
  • test/mocks/da.go
  • test/testda/dummy.go
  • tools/local-da/local.go
  • tools/local-da/rpc.go

@julienrbrt julienrbrt self-requested a review March 4, 2026 17:50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we able to use that in the async block retriever?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed in the standup this will be done in a follow-up

Copy link
Member

@julienrbrt julienrbrt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can improve a bit the client constructor so we only need one and can handle if websockets are for some reasons not available.


// NewClient connects to the DA RPC endpoint using the address as-is.
// Uses HTTP by default (lazy connection — only connects on first RPC call).
// Does NOT support channel-based subscriptions (e.g. Subscribe).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it, the client are effectively the same, just the address changes.
Additionally, websockets can be disabled I believe, so shouldn't we keep this one constructor, check the websocket connection, use it if available and otherwise fallback to http?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The websocket connection was causing issues with tests initially. Without the second constructor, we would upgrade everything to WS automatically. Let me check the tests again how much effort is left to upgrade all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the event subscription, we make websocket connections mandatory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed in the standup this will be done in a follow-up

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
block/internal/syncing/da_follower.go (1)

265-279: ⚠️ Potential issue | 🔴 Critical

Use CAS-based rollback to avoid regressing localNextDAHeight.

Lines 265, 279, and 362 unconditionally Store(...) during rollback. If the other loop advanced localNextDAHeight meanwhile, this can move it backward and reprocess heights out of order.

🛠️ Proposed fix
 		for _, event := range events {
 			if err := f.pipeEvent(ctx, event); err != nil {
 				// Roll back so catchupLoop can retry this height.
-				f.localNextDAHeight.Store(ev.Height)
+				if !f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height) {
+					f.logger.Debug().
+						Uint64("da_height", ev.Height).
+						Uint64("local_next_da_height", f.localNextDAHeight.Load()).
+						Msg("skip rollback: localNextDAHeight already advanced")
+				}
 				f.logger.Warn().Err(err).Uint64("da_height", ev.Height).
 					Msg("failed to pipe inline event, catchup will retry")
 				return
 			}
 		}
@@
 		} else {
 			// No complete events (split namespace, waiting for other half).
-			f.localNextDAHeight.Store(ev.Height)
+			if !f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height) {
+				f.logger.Debug().
+					Uint64("da_height", ev.Height).
+					Uint64("local_next_da_height", f.localNextDAHeight.Load()).
+					Msg("skip rollback: localNextDAHeight already advanced")
+			}
 		}
@@
 		if err := f.fetchAndPipeHeight(ctx, local); err != nil {
 			// Roll back so we can retry after backoff.
-			f.localNextDAHeight.Store(local)
+			if !f.localNextDAHeight.CompareAndSwap(local+1, local) {
+				f.logger.Debug().
+					Uint64("da_height", local).
+					Uint64("local_next_da_height", f.localNextDAHeight.Load()).
+					Msg("skip rollback: localNextDAHeight already advanced")
+			}
 			if !f.waitOnCatchupError(ctx, err, local) {
 				return
 			}
 			continue
 		}
As per coding guidelines: "Be careful with concurrent access to shared state".

Also applies to: 360-363

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 265 - 279, The rollback
unconditionally calls f.localNextDAHeight.Store(...) which can regress the
counter if another goroutine advanced it; change these stores (the ones near
f.localNextDAHeight.Store in the rollback path and the other occurrences noted
around where events are handled and at the other block ~360-363) to a CAS
update: read cur := f.localNextDAHeight.Load(), and only attempt to set via a
CompareAndSwap (or a small CAS loop) to f.localNextDAHeight.CompareAndSwap(cur,
ev.Height) (or loop while cur < ev.Height and CAS fails: reload cur and retry)
so you only advance the value and never move it backward; apply this pattern to
each place that currently calls f.localNextDAHeight.Store(...) during rollback.
🧹 Nitpick comments (1)
tools/local-da/local.go (1)

204-243: Consider extracting shared submit-to-rpc conversion logic.

SubmitWithOptions and Submit now duplicate the same namespace conversion, blob conversion, storage, and notify flow. A small shared helper would reduce drift risk between the two code paths.

Also applies to: 245-284

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tools/local-da/local.go` around lines 204 - 243, SubmitWithOptions and Submit
duplicate namespace conversion, blob->rpc conversion, ID generation, storage
into d.data/d.blobData, timestamp/height bumping and d.notifySubscribers;
extract that shared logic into a single helper (e.g., processAndStoreBlobs or
storeBlobsAtNextHeight) that takes (ctx, ns []byte, blobs []datypes.Blob) and
performs: validate/convert namespace (libshare.NewNamespaceFromBytes), allocate
new height (increment d.height, set d.timestamps[d.height]=d.monotonicTime()),
generate IDs using d.nextID() and d.getHash(), append kvp to d.data[d.height],
build []*blobrpc.Blob via blobrpc.NewBlobV0 and set d.blobData[d.height], then
call d.notifySubscribers(d.height) and return the generated IDs (and error).
Replace the duplicated sections in SubmitWithOptions and Submit to call this
helper and keep logging/validation outside as appropriate.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/evm/cmd/run.go`:
- Around line 63-66: The WS blob client created via
blobrpc.NewWSClient(cmd.Context(), nodeConfig.DA.Address,
nodeConfig.DA.AuthToken, "") is never closed; after verifying err == nil,
immediately schedule closing the client (e.g., defer blobClient.Close()) so the
WebSocket is cleaned up on shutdown and prevents goroutine/resource leaks—place
the defer directly after the successful assignment to blobClient and before
returning from run-related functions.

In `@block/internal/syncing/da_follower.go`:
- Around line 112-115: The catchupLoop is only triggered by catchupSignal via
signalCatchup (called from updateHighest on subscription events), so if the node
starts behind and no new events arrive catchup never runs; modify startup to
initiate catchup immediately: when launching the goroutine(s) (the code that
calls f.wg.Add and starts followLoop and catchupLoop) ensure you either signal
catchup once at startup or start catchupLoop in a mode that performs an initial
bootstrap catchup before waiting on catchupSignal (adjust catchupLoop or call
signalCatchup directly). Apply the same change to the other goroutine startup
sites that spawn catchupLoop/followLoop to guarantee an initial catchup run even
without incoming subscription events.

In `@block/internal/syncing/syncer_backoff_test.go`:
- Around line 241-245: The test is closing over the loop variable h causing all
Run callbacks to see the final value (6); fix by capturing h per-iteration into
a new local (e.g., localH := h) before calling daRetriever.On("RetrieveFromDA",
mock.Anything, h).Run(...), and use that local (localH) inside the Run callback
so fetchedHeights receives the correct values 3,4,5 when runCatchup() executes.

In `@block/internal/syncing/syncer.go`:
- Around line 210-214: On DA follower startup failure you must stop any
previously-started retrievers to avoid goroutine leaks: after
s.daFollower.Start(ctx) returns an error, call the Stop/Close method on any
components started earlier (e.g., s.raftRetriever.Stop(ctx) or the appropriate
shutdown method on other retrievers), handle/ignore its error as needed, then
proceed with s.cancel() and s.wg.Wait(); update the failure branch that
currently only invokes s.cancel() and s.wg.Wait() to also explicitly shut down
s.raftRetriever (and any other started retrievers) before returning the error.

In `@tools/local-da/local.go`:
- Around line 226-236: The code silently ignores errors from
libshare.NewNamespaceFromBytes and blobrpc.NewBlobV0 in SubmitWithOptions and
Submit, which leads to nil rpc blobs and inconsistent state; update both
functions to check and handle those errors by returning a wrapped error (use
fmt.Errorf with context) instead of continuing, ensure you do not append ids or
store kvp entries when NewNamespaceFromBytes fails, and only append to
rpcBlobs/d.data[d.height] after successful blobrpc.NewBlobV0 conversion (refer
to functions SubmitWithOptions and Submit and the NewNamespaceFromBytes and
NewBlobV0 calls).

---

Duplicate comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 265-279: The rollback unconditionally calls
f.localNextDAHeight.Store(...) which can regress the counter if another
goroutine advanced it; change these stores (the ones near
f.localNextDAHeight.Store in the rollback path and the other occurrences noted
around where events are handled and at the other block ~360-363) to a CAS
update: read cur := f.localNextDAHeight.Load(), and only attempt to set via a
CompareAndSwap (or a small CAS loop) to f.localNextDAHeight.CompareAndSwap(cur,
ev.Height) (or loop while cur < ev.Height and CAS fails: reload cur and retry)
so you only advance the value and never move it backward; apply this pattern to
each place that currently calls f.localNextDAHeight.Store(...) during rollback.

---

Nitpick comments:
In `@tools/local-da/local.go`:
- Around line 204-243: SubmitWithOptions and Submit duplicate namespace
conversion, blob->rpc conversion, ID generation, storage into d.data/d.blobData,
timestamp/height bumping and d.notifySubscribers; extract that shared logic into
a single helper (e.g., processAndStoreBlobs or storeBlobsAtNextHeight) that
takes (ctx, ns []byte, blobs []datypes.Blob) and performs: validate/convert
namespace (libshare.NewNamespaceFromBytes), allocate new height (increment
d.height, set d.timestamps[d.height]=d.monotonicTime()), generate IDs using
d.nextID() and d.getHash(), append kvp to d.data[d.height], build
[]*blobrpc.Blob via blobrpc.NewBlobV0 and set d.blobData[d.height], then call
d.notifySubscribers(d.height) and return the generated IDs (and error). Replace
the duplicated sections in SubmitWithOptions and Submit to call this helper and
keep logging/validation outside as appropriate.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7a45cdf6-63a2-47d8-9960-b9ec8716bbc7

📥 Commits

Reviewing files that changed from the base of the PR and between dd7e0cd and 6c1e630.

📒 Files selected for processing (8)
  • apps/evm/cmd/run.go
  • block/internal/syncing/da_follower.go
  • block/internal/syncing/da_retriever.go
  • block/internal/syncing/syncer.go
  • block/internal/syncing/syncer_backoff_test.go
  • block/internal/syncing/syncer_benchmark_test.go
  • block/internal/syncing/syncer_test.go
  • tools/local-da/local.go

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (3)
block/internal/syncing/da_follower.go (3)

109-120: ⚠️ Potential issue | 🟠 Major

Catchup may never start if no subscription events arrive.

If the node starts behind and no new DA blob event arrives, catchupLoop will block indefinitely on catchupSignal. The signal is only sent by updateHighest, which is only called when subscription events arrive.

Consider seeding highestSeenDAHeight at startup by querying the latest DA height:

🐛 Proposed bootstrap fix
 func (f *daFollower) Start(ctx context.Context) error {
 	ctx, f.cancel = context.WithCancel(ctx)
 
+	// Seed highest DA height on startup so catchup can begin immediately
+	// even if no fresh subscription event arrives right away.
+	if f.client != nil {
+		if latest, err := f.client.GetLatestDAHeight(ctx); err == nil {
+			f.updateHighest(latest)
+		} else {
+			f.logger.Debug().Err(err).Msg("failed to seed latest DA height at startup")
+		}
+	}
+
 	f.wg.Add(2)
 	go f.followLoop(ctx)
 	go f.catchupLoop(ctx)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 109 - 120, The
catchupLoop may never run if no subscription events arrive because
highestSeenDAHeight is only advanced via updateHighest; to fix, during
daFollower.Start seed highestSeenDAHeight (or call updateHighest) by querying
the latest DA height from the DA client before launching followLoop/catchupLoop
so catchupSignal can be triggered even with no incoming events; locate Start
(method daFollower.Start), highestSeenDAHeight, updateHighest, catchupSignal and
ensure Start obtains the current DA head and updates/sets highestSeenDAHeight
(or invokes updateHighest) and emits the catchupSignal if needed before starting
the goroutines.

260-281: ⚠️ Potential issue | 🟠 Major

Use CAS-based rollback to avoid regressing localNextDAHeight.

The unconditional Store(ev.Height) at Lines 265 and 279 can regress localNextDAHeight if catchupLoop has advanced it concurrently:

  1. followLoop: CAS(N, N+1) succeeds
  2. catchupLoop: CAS(N+1, N+2) succeeds
  3. followLoop: pipeEvent fails → Store(N) regresses value from N+2 to N

This could cause duplicate processing of heights N and N+1.

🐛 Proposed CAS-based rollback
 		for _, event := range events {
 			if err := f.pipeEvent(ctx, event); err != nil {
-				// Roll back so catchupLoop can retry this height.
-				f.localNextDAHeight.Store(ev.Height)
+				// Roll back only if catchupLoop hasn't advanced past us.
+				f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)
 				f.logger.Warn().Err(err).Uint64("da_height", ev.Height).
 					Msg("failed to pipe inline event, catchup will retry")
 				return
 			}
 		}
 		if len(events) != 0 {
 			if !f.headReached.Load() && f.localNextDAHeight.Load() > f.highestSeenDAHeight.Load() {
 				f.headReached.Store(true)
 			}
 			f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
 				Msg("processed subscription blobs inline (fast path)")
 		} else {
-			// No complete events (split namespace, waiting for other half).
-			f.localNextDAHeight.Store(ev.Height)
+			// No complete events (split namespace, waiting for other half).
+			// Roll back only if catchupLoop hasn't advanced past us.
+			f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 260 - 281, The rollback
after a failed pipeEvent (and the fallback when no complete events)
unconditionally calls f.localNextDAHeight.Store(ev.Height), which can regress
progress if catchupLoop advanced it; change both rollbacks to a CAS that only
sets ev.Height when the current value is ev.Height+1 (i.e., call
f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)) so you only revert
if this goroutine still owns the +1 advance; do the same replacement for the
other Store(ev.Height) path, and optionally log or ignore the CAS failure
instead of forcing the store; locate these changes around the pipeEvent error
handling and the "No complete events" branch in the followLoop code that
manipulates localNextDAHeight.

362-369: ⚠️ Potential issue | 🟠 Major

Same CAS-based rollback issue in catchupLoop.

Line 364 has the same race condition as the inline processing path. If followLoop advances localNextDAHeight via inline processing between the CAS at Line 357 and the error rollback at Line 364, the Store will regress the value.

🐛 Proposed fix
 		if err := f.fetchAndPipeHeight(ctx, local); err != nil {
-			// Roll back so we can retry after backoff.
-			f.localNextDAHeight.Store(local)
+			// Roll back only if followLoop hasn't advanced past us.
+			f.localNextDAHeight.CompareAndSwap(local+1, local)
 			if !f.waitOnCatchupError(ctx, err, local) {
 				return
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 362 - 369, The rollback
in catchupLoop uses f.localNextDAHeight.Store(local) which can regress progress
if followLoop advanced the value; change the rollback to only set
localNextDAHeight back when the current value still equals the expected local
(use an atomic CompareAndSwap or conditional load+store): e.g., check
f.localNextDAHeight.Load() == local (or use
f.localNextDAHeight.CompareAndSwap(expectedLocal, local)) before performing the
Store, so fetchAndPipeHeight/followLoop races won't regress the counter.
🧹 Nitpick comments (2)
test/testda/dummy.go (1)

298-311: Consider moving height increment inside the lock for consistency.

In Submit (line 149), d.height.Add(1) occurs inside the mutex-protected section. Here, it occurs before acquiring d.mu, creating a window where GetLatestDAHeight() returns the new height before the subscription event is sent. This ordering difference could cause subtle timing issues in tests.

♻️ Optional fix for consistency
 		select {
 		case <-ticker.C:
 			now := time.Now()
-			height := d.height.Add(1)
 			d.mu.Lock()
+			height := d.height.Add(1)
 			if d.headers[height] == nil {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/testda/dummy.go` around lines 298 - 311, The height increment
d.height.Add(1) should be moved inside the mutex to match Submit's ordering and
avoid racing GetLatestDAHeight(): acquire d.mu before computing/assigning height
(inside the ticker handler), then update d.headers, call d.notifySubscribers
with the new height, and finally release the lock; ensure you still compute
Timestamp now := time.Now() as needed and keep notifySubscribers invoked while
the lock is held to preserve ordering consistency with Submit and avoid the
window where GetLatestDAHeight() could observe the new height before the
subscription event is emitted.
block/internal/syncing/da_follower.go (1)

88-106: Consider validating required config fields.

NewDAFollower doesn't validate that cfg.Client and cfg.Retriever are non-nil. If either is nil, subsequent calls in runSubscription or fetchAndPipeHeight will panic.

🛡️ Proposed defensive validation
 func NewDAFollower(cfg DAFollowerConfig) DAFollower {
+	if cfg.Client == nil {
+		panic("DAFollowerConfig.Client is required")
+	}
+	if cfg.Retriever == nil {
+		panic("DAFollowerConfig.Retriever is required")
+	}
+	if cfg.PipeEvent == nil {
+		panic("DAFollowerConfig.PipeEvent is required")
+	}
 	dataNs := cfg.DataNamespace
 	if len(dataNs) == 0 {
 		dataNs = cfg.Namespace
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 88 - 106, NewDAFollower
should validate required config fields: check that cfg.Client and cfg.Retriever
are non-nil before constructing the daFollower, returning an error or panicking
early as your codebase convention dictates; update NewDAFollower (and its caller
contract if needed) to validate DAFollowerConfig.Client and
DAFollowerConfig.Retriever and avoid creating a daFollower that will panic later
in runSubscription or fetchAndPipeHeight, ensuring any error message clearly
names the missing field(s) and references the constructor NewDAFollower and the
daFollower struct.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 109-120: The catchupLoop may never run if no subscription events
arrive because highestSeenDAHeight is only advanced via updateHighest; to fix,
during daFollower.Start seed highestSeenDAHeight (or call updateHighest) by
querying the latest DA height from the DA client before launching
followLoop/catchupLoop so catchupSignal can be triggered even with no incoming
events; locate Start (method daFollower.Start), highestSeenDAHeight,
updateHighest, catchupSignal and ensure Start obtains the current DA head and
updates/sets highestSeenDAHeight (or invokes updateHighest) and emits the
catchupSignal if needed before starting the goroutines.
- Around line 260-281: The rollback after a failed pipeEvent (and the fallback
when no complete events) unconditionally calls
f.localNextDAHeight.Store(ev.Height), which can regress progress if catchupLoop
advanced it; change both rollbacks to a CAS that only sets ev.Height when the
current value is ev.Height+1 (i.e., call
f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)) so you only revert
if this goroutine still owns the +1 advance; do the same replacement for the
other Store(ev.Height) path, and optionally log or ignore the CAS failure
instead of forcing the store; locate these changes around the pipeEvent error
handling and the "No complete events" branch in the followLoop code that
manipulates localNextDAHeight.
- Around line 362-369: The rollback in catchupLoop uses
f.localNextDAHeight.Store(local) which can regress progress if followLoop
advanced the value; change the rollback to only set localNextDAHeight back when
the current value still equals the expected local (use an atomic CompareAndSwap
or conditional load+store): e.g., check f.localNextDAHeight.Load() == local (or
use f.localNextDAHeight.CompareAndSwap(expectedLocal, local)) before performing
the Store, so fetchAndPipeHeight/followLoop races won't regress the counter.

---

Nitpick comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 88-106: NewDAFollower should validate required config fields:
check that cfg.Client and cfg.Retriever are non-nil before constructing the
daFollower, returning an error or panicking early as your codebase convention
dictates; update NewDAFollower (and its caller contract if needed) to validate
DAFollowerConfig.Client and DAFollowerConfig.Retriever and avoid creating a
daFollower that will panic later in runSubscription or fetchAndPipeHeight,
ensuring any error message clearly names the missing field(s) and references the
constructor NewDAFollower and the daFollower struct.

In `@test/testda/dummy.go`:
- Around line 298-311: The height increment d.height.Add(1) should be moved
inside the mutex to match Submit's ordering and avoid racing
GetLatestDAHeight(): acquire d.mu before computing/assigning height (inside the
ticker handler), then update d.headers, call d.notifySubscribers with the new
height, and finally release the lock; ensure you still compute Timestamp now :=
time.Now() as needed and keep notifySubscribers invoked while the lock is held
to preserve ordering consistency with Submit and avoid the window where
GetLatestDAHeight() could observe the new height before the subscription event
is emitted.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d6918e55-cf7b-4700-9ded-4f821cec7349

📥 Commits

Reviewing files that changed from the base of the PR and between 6c1e630 and 896c6ad.

📒 Files selected for processing (3)
  • block/internal/syncing/da_follower.go
  • block/internal/syncing/da_retriever_strict_test.go
  • test/testda/dummy.go

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
block/internal/syncing/syncer.go (1)

194-220: ⚠️ Potential issue | 🟠 Major

raftRetriever not stopped on daFollower.Start failure.

If daFollower.Start(ctx) fails at line 218, the defer calls s.Stop(). However, Stop() (lines 229-276) doesn't stop raftRetriever which was successfully started at line 195. This leaves raftRetriever goroutines running after the failed startup.

🛠️ Proposed fix: Stop raftRetriever in Stop()
 func (s *Syncer) Stop() error {
 	if s.cancel == nil {
 		return nil
 	}
 
 	s.cancel()
 	s.cancelP2PWait(0)
 
 	// Stop the DA follower first (it owns its own goroutines).
 	if s.daFollower != nil {
 		s.daFollower.Stop()
 	}
+
+	// Stop raft retriever if it was started.
+	if s.raftRetriever != nil {
+		s.raftRetriever.Stop()
+	}
 
 	s.wg.Wait()

As per coding guidelines: "Be mindful of goroutine leaks".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/syncer.go` around lines 194 - 220, The raft retriever
started by s.raftRetriever.Start(ctx) isn't being stopped on later startup
failures (e.g., when s.daFollower.Start(ctx) fails); update Stop() to stop/close
the raft retriever to avoid goroutine leaks by checking s.raftRetriever != nil
and calling its Stop/Close method (or the appropriate shutdown function used by
the retriever) and waiting for it to finish (similar to how other workers are
stopped), so that any goroutines spawned by raftRetriever are cleanly terminated
when s.Stop() is invoked after a failed s.daFollower.Start(ctx).
♻️ Duplicate comments (1)
block/internal/syncing/da_follower.go (1)

261-282: ⚠️ Potential issue | 🟡 Minor

Unconditional Store can regress localNextDAHeight if catchupLoop advances concurrently.

Between the successful CAS at line 261 and the rollback at line 266 or 280, catchupLoop could claim and advance localNextDAHeight further. The unconditional Store(ev.Height) would then move the height backward, causing catchupLoop to re-process already-processed heights.

While the downstream processHeightEvent has duplicate detection, this causes unnecessary DA fetches and log noise.

🛡️ Proposed fix using CAS for rollback
 		for _, event := range events {
 			if err := f.pipeEvent(ctx, event); err != nil {
 				// Roll back so catchupLoop can retry this height.
-				f.localNextDAHeight.Store(ev.Height)
+				// Only roll back if we still own this height (no one advanced past us).
+				f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)
 				f.logger.Warn().Err(err).Uint64("da_height", ev.Height).
 					Msg("failed to pipe inline event, catchup will retry")
 				return
 			}
 		}
 		if len(events) != 0 {
 			if !f.headReached.Load() && f.localNextDAHeight.Load() > f.highestSeenDAHeight.Load() {
 				f.headReached.Store(true)
 			}
 			f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
 				Msg("processed subscription blobs inline (fast path)")
 		} else {
 			// No complete events (split namespace, waiting for other half).
-			f.localNextDAHeight.Store(ev.Height)
+			// Only roll back if we still own this height.
+			f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 261 - 282, The rollback
currently does an unconditional f.localNextDAHeight.Store(ev.Height) which can
regress progress if catchupLoop advanced the height concurrently; change both
rollback sites (after pipeEvent error and the "No complete events" branch) to
perform a conditional compare-and-swap that only resets the height when the
current value is still ev.Height+1 (e.g., use
f.localNextDAHeight.CompareAndSwap(ev.Height+1, ev.Height)); this preserves a
rollback when we truly still own that slot but avoids moving the pointer
backward if catchupLoop has already advanced it. Ensure you update both
occurrences that reference f.localNextDAHeight.Store(ev.Height) and keep the
existing CAS used earlier as the pattern.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 363-370: The rollback uses f.localNextDAHeight.Store(local) which
can overwrite a newer value set by followLoop; change the rollback to use a
compare-and-swap against the expected advanced value (i.e., attempt CAS back to
local only if the current value equals the value you observed before calling
f.fetchAndPipeHeight) so you don't regress concurrent progress—locate the block
around f.fetchAndPipeHeight(ctx, local) in followLoop and replace the
unconditional Store with a CAS that checks the current f.localNextDAHeight
against the pre-call observed value before writing local.

---

Outside diff comments:
In `@block/internal/syncing/syncer.go`:
- Around line 194-220: The raft retriever started by s.raftRetriever.Start(ctx)
isn't being stopped on later startup failures (e.g., when
s.daFollower.Start(ctx) fails); update Stop() to stop/close the raft retriever
to avoid goroutine leaks by checking s.raftRetriever != nil and calling its
Stop/Close method (or the appropriate shutdown function used by the retriever)
and waiting for it to finish (similar to how other workers are stopped), so that
any goroutines spawned by raftRetriever are cleanly terminated when s.Stop() is
invoked after a failed s.daFollower.Start(ctx).

---

Duplicate comments:
In `@block/internal/syncing/da_follower.go`:
- Around line 261-282: The rollback currently does an unconditional
f.localNextDAHeight.Store(ev.Height) which can regress progress if catchupLoop
advanced the height concurrently; change both rollback sites (after pipeEvent
error and the "No complete events" branch) to perform a conditional
compare-and-swap that only resets the height when the current value is still
ev.Height+1 (e.g., use f.localNextDAHeight.CompareAndSwap(ev.Height+1,
ev.Height)); this preserves a rollback when we truly still own that slot but
avoids moving the pointer backward if catchupLoop has already advanced it.
Ensure you update both occurrences that reference
f.localNextDAHeight.Store(ev.Height) and keep the existing CAS used earlier as
the pattern.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 210d6897-6e80-45e2-8b65-a18f662facaf

📥 Commits

Reviewing files that changed from the base of the PR and between 896c6ad and 191ae01.

📒 Files selected for processing (3)
  • apps/evm/cmd/run.go
  • block/internal/syncing/da_follower.go
  • block/internal/syncing/syncer.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • apps/evm/cmd/run.go

Comment on lines +363 to +370
if err := f.fetchAndPipeHeight(ctx, local); err != nil {
// Roll back so we can retry after backoff.
f.localNextDAHeight.Store(local)
if !f.waitOnCatchupError(ctx, err, local) {
return
}
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Same Store rollback pattern that can regress localNextDAHeight.

Line 365 has the same issue as the inline processing path: between CAS at line 358 and the error handler at line 365, followLoop could have advanced localNextDAHeight via its own CAS. Use CAS for rollback here as well.

🛡️ Proposed fix
 		if err := f.fetchAndPipeHeight(ctx, local); err != nil {
 			// Roll back so we can retry after backoff.
-			f.localNextDAHeight.Store(local)
+			// Only roll back if followLoop hasn't advanced past us.
+			f.localNextDAHeight.CompareAndSwap(local+1, local)
 			if !f.waitOnCatchupError(ctx, err, local) {
 				return
 			}
 			continue
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if err := f.fetchAndPipeHeight(ctx, local); err != nil {
// Roll back so we can retry after backoff.
f.localNextDAHeight.Store(local)
if !f.waitOnCatchupError(ctx, err, local) {
return
}
continue
}
if err := f.fetchAndPipeHeight(ctx, local); err != nil {
// Roll back so we can retry after backoff.
// Only roll back if followLoop hasn't advanced past us.
f.localNextDAHeight.CompareAndSwap(local+1, local)
if !f.waitOnCatchupError(ctx, err, local) {
return
}
continue
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/syncing/da_follower.go` around lines 363 - 370, The rollback
uses f.localNextDAHeight.Store(local) which can overwrite a newer value set by
followLoop; change the rollback to use a compare-and-swap against the expected
advanced value (i.e., attempt CAS back to local only if the current value equals
the value you observed before calling f.fetchAndPipeHeight) so you don't regress
concurrent progress—locate the block around f.fetchAndPipeHeight(ctx, local) in
followLoop and replace the unconditional Store with a CAS that checks the
current f.localNextDAHeight against the pre-call observed value before writing
local.

@julienrbrt
Copy link
Member

I it still unclear to me if a celestia-node allow to disable web sockets. If so then I still believe some checks in the constructor would be handy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants